df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load('train.csv')
Read scv files and create data frame manually.
In [1]:
import csv
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from StringIO import StringIO
from datetime import *
from dateutil.parser import parse
Initialize contexts and input file:
In [2]:
sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc)
textRDD = sc.textFile("../../data/sf-crime/train.csv.bz2")
textRDD.count()
Out[2]:
Remove header row from input file:
In [3]:
header = textRDD.first()
textRDD = textRDD.filter(lambda line: not line == header)
Define data schema:
In [4]:
fields = [StructField(field_name, StringType(), True) for field_name in header.split(',')]
fields[0].dataType = TimestampType()
fields[7].dataType = FloatType()
fields[8].dataType = FloatType()
schema = StructType(fields)
Parse CSV lines and transform values into tuples:
In [5]:
# parse each csv line (fields may contain enclosed ',' in parantheses) and split into tuples
tupleRDD = textRDD \
.map(lambda line: next(csv.reader(StringIO(line)))) \
.map(lambda x: (parse(x[0]), x[1], x[2], x[3], x[4], x[5], x[6], float(x[7]), float(x[8])))
df = sqlContext.createDataFrame(tupleRDD, schema)
Write DataFrame as parquet file:
In [6]:
df.write.save("../../data/sf-crime/train.parquet")